Source code for nlp_architect.solutions.trend_analysis.trend_analysis

# ******************************************************************************
# Copyright 2017-2018 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
import argparse
import csv
import logging
import operator
import os
import sys
from os import path
from pathlib import Path
from shutil import copyfile

import numpy
import pandas as pd
from sklearn.manifold import TSNE

from nlp_architect import LIBRARY_OUT
from nlp_architect.utils.embedding import FasttextEmbeddingsModel
from nlp_architect.utils.io import check_size, validate_existing_filepath
from nlp_architect.utils.text import simple_normalizer

dir = str(LIBRARY_OUT / 'trend-analysis-data')
logger = logging.getLogger(__name__)
target_topics_path = path.join(dir, 'target_topics.csv')
ref_topics_path = path.join(dir, 'ref_topics.csv')
target_scores_path = path.join(dir, 'target_scores.csv')
ref_scores_path = path.join(dir, 'ref_scores.csv')


[docs]def analyze(target_data, ref_data, tar_header, ref_header, top_n=10000, top_n_vectors=500, re_analysis=False, tfidf_w=0.5, cval_w=0.5, lm_w=0): """ Compare a topics list of a target data to a topics list of a reference data and extract hot topics, trends and clusters. Topic lists can be generated by running topic_extraction.py Args: target_data: A list of topics with importance scores extracted from the tagret corpus ref_data: A list of topics with importance scores extracted from the reference corpus tar_header: The header to appear for the target topics graphs ref_header: The header to appear for the reference topics graphs top_n (int): Limit the analysis to only the top N phrases of each list top_n_vectors (int): The number of vectors to include in the scatter re_analysis (Boolean): whether a first analysis has already been made or not tfidf_w (Float): the TF_IDF weight for the final score calculation cval_w (Float): the C_Value weight for the final score calculation lm_w (Float): the Language-Model weight for the final score calculation """ hash2group = {} rep2rank = {} # the merged list of groups extracted from both corpora. # Will be sorted by their rank. in_model_count = 0 create_clusters = False try: if not re_analysis: # first analysis, not through ui copyfile(target_data, target_topics_path) # copying the initial # scores file to have a const filename for the ui to recognize # when reanalyzing copyfile(ref_data, ref_topics_path) calc_scores(target_data, tfidf_w, cval_w, lm_w, target_scores_path) calc_scores(ref_data, tfidf_w, cval_w, lm_w, ref_scores_path) # unify all topics: with open(ref_data, encoding='utf-8', errors='ignore') as f: topics1 = sum(1 for _ in f) with open(target_data, encoding='utf-8', errors='ignore') as f: topics2 = sum(1 for _ in f) sum_topics = topics1 + topics2 logger.info("sum of all topics= %s", str(sum_topics)) merge_phrases(ref_scores_path, True, hash2group, rep2rank, top_n, sum_topics) merge_phrases(target_scores_path, False, hash2group, rep2rank, top_n, sum_topics) logger.info("Total number of evaluated topics: %s", str(len(rep2rank))) all_topics_sorted = sorted(rep2rank, key=rep2rank.get) top_n_scatter = len( all_topics_sorted) if top_n_vectors is None else top_n_vectors # compute 2D space clusters if model exists: w2v_loc = path.join(dir, 'W2V_Models/model.bin') if os.path.isfile(w2v_loc): scatter_group = all_topics_sorted[0:top_n_scatter] np_scat, x_scat, y_scat, in_model_count =\ compute_scatter_subwords(scatter_group, w2v_loc) if np_scat is not None and x_scat is not None: create_clusters = True for j in range(len(np_scat)): hash2group[simple_normalizer(np_scat[j])] += (x_scat[j], y_scat[j]) # prepare reports data: groups_r = list( filter(lambda x: x[2] == 0 or x[2] == 2, hash2group.values())) groups_t = list( filter(lambda x: x[2] == 1 or x[2] == 2, hash2group.values())) trends = list( filter(lambda x: x[2] == 2, hash2group.values())) # all trends groups_r_sorted = sorted( groups_r, key=operator.itemgetter(1), reverse=True) groups_t_sorted = sorted( groups_t, key=operator.itemgetter(3), reverse=True) trends_sorted = sorted( trends, key=operator.itemgetter(6), reverse=True) # sort by t_score # save results: save_report_data(hash2group, groups_r_sorted, groups_t_sorted, trends_sorted, all_topics_sorted, create_clusters, tar_header, ref_header, tfidf_w, cval_w, lm_w, in_model_count, top_n_scatter) logger.info('Done analysis.') except Exception as e: logger.error(str(e))
[docs]def save_report_data(hash2group, groups_r_sorted, groups_t_sorted, trends_sorted, all_topics_sorted, create_clusters, target_header, ref_header, tfidf_w, cval_w, freq_w, in_model_count, top_n_scatter): filter_output = path.join(dir, 'filter_phrases.csv') data_output = path.join(dir, 'graph_data.csv') logger.info('writing results to: %s and: %s ', str(data_output), str(filter_output)) reports_column = [ 'Top Topics (' + ref_header + ')', 'Top Topics (' + target_header + ')', 'Hot Trends', 'Cold Trends', 'Custom Trends' ] if create_clusters: reports_column.extend([ 'Trend Clustering', 'Topic Clustering (' + ref_header + ')', 'Topic Clustering (' + target_header + ')' ]) headers = [ 'reports', 'ref_topic', 'ref_imp', 'x_ref', 'y_ref', 'tar_topic', 'tar_imp', 'x_tar', 'y_tar', 'trends', 'change', 't_score', 'x_tre', 'y_tre', 'weights', 'in_w2v_model_count', 'top_n_scatter' ] filter_headers = [ 'topics', 'valid', 'custom' ] weights = [tfidf_w, cval_w, freq_w] with open(data_output, 'wt', encoding='utf-8') as data_file: with open(filter_output, 'wt', encoding='utf-8') as filter_file: data_writer = csv.writer(data_file, delimiter=',') filter_writer = csv.writer(filter_file, delimiter=',') data_writer.writerow(headers) filter_writer.writerow(filter_headers) for i in range(len(hash2group.keys())): try: new_row = () new_row += (reports_column[i],) if i < len( reports_column) else ('',) if i < len(groups_r_sorted): new_row += ( groups_r_sorted[i][0], groups_r_sorted[i][1]) # 'only b' type tuple if groups_r_sorted[i][2] == 0: new_row += ( groups_r_sorted[i][-2], groups_r_sorted[i][-1]) if len( groups_r_sorted[i]) > 4 else (-1, -1) else: # 'trend' type tuple new_row += ( groups_r_sorted[i][-2], groups_r_sorted[i][-1]) if len( groups_r_sorted[i]) > 7 else (-1, -1) else: new_row += ('', '', '', '') if len(groups_t_sorted) > i: new_row += ( groups_t_sorted[i][0], groups_t_sorted[i][3]) # 'only a' type tuple if groups_t_sorted[i][2] == 1: new_row += (groups_t_sorted[i][-2], groups_t_sorted[i][-1]) if len( groups_t_sorted[i]) > 5 else (-1, -1) else: new_row += (groups_t_sorted[i][-2], groups_t_sorted[i][-1]) if len( groups_t_sorted[i]) > 7 else (-1, -1) else: new_row += ('', '', '', '') if len(trends_sorted) > i: new_row += ( trends_sorted[i][0], trends_sorted[i][4], trends_sorted[i][6]) new_row += ( trends_sorted[i][-2], trends_sorted[i][-1]) if len( trends_sorted[i]) > 7 else (-1, -1) else: new_row += ('', '', '', '') new_row += (weights[i],) if i < len( weights) else ('',) new_row += (in_model_count, top_n_scatter) if i == 0 else ('',) data_writer.writerow(new_row) filter_row = (all_topics_sorted[i], 1, 0) filter_writer.writerow(filter_row) except Exception as e: logger.error( "Error while writing analysis results. " "iteration #: %s. Error: %s", str(i), str(e)) if len(hash2group.keys()) < len(reports_column): for i in range(len(hash2group.keys()), len(reports_column)): new_row = () new_row += (reports_column[i], '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '') data_writer.writerow(new_row)
[docs]def compute_scatter_subwords(top_groups, w2v_loc): """ Compute 2D vectors of the provided phrases groups Args: top_groups: A list of group-representative phrases (List(String)) w2v_loc: A path to a w2v model (String) Returns: A tuple (phrases, x, y, n) WHERE: phrases: A list of phrases that are part of the model x: A DataFrame column as the x values of the phrase vector y: A DataFrame column as the y values of the phrase vector n: The number of computed vectors """ x_feature, np_indices = [], [] in_ctr = 0 try: if path.isfile(w2v_loc): logger.info("computing scatter on %s groups", str(len(top_groups))) model = FasttextEmbeddingsModel.load(w2v_loc) for a in top_groups: x_feature.append(model[a]) np_indices.append(a) in_ctr += 1 logger.info("topics found in model out of %s: %s.", str(len(top_groups)), str(in_ctr)) if len(np_indices) == 0: logger.error('no vectors extracted') return None, None, None, 0 logger.info('computing TSNE embedding...') tsne = TSNE(n_components=2, random_state=0, method='exact') numpy.set_printoptions(suppress=True) x_tsne = tsne.fit_transform(x_feature) df = pd.DataFrame( x_tsne, index=np_indices, columns=['x', 'y']) return np_indices, df['x'], df['y'], in_ctr logger.error('no model found for cumputing scatter, skipping step.') return None, None, None, 0 except Exception as e: logger.error(str(e))
[docs]def calc_scores(scores_file, tfidf_w, cval_w, lm_w, output_path): """ Given a topic list with tf_idf,c_value,language_model scores, compute a final score for each phrases group according to the given weights Args: scores_file (String): A path to the file with groups and raw scores tfidf_w (Float): the TF_IDF weight for the final score calculation cval_w (Float): the C_Value weight for the final score calculation lm_w (Float): the Language-Model weight for the final score calculation output_path: A path for the output file of final scores (String) """ logger.info("calculating scores for file: %s with: tfidf_w=%s, cval_w=%s," " freq_w=%s", scores_file, str(tfidf_w), str(cval_w), str(lm_w)) with open(scores_file, encoding='utf-8', errors='ignore') as csv_file: lines = csv.reader((x.replace('\0', '') for x in csv_file), delimiter=',') final_list = [] for group, tfidf, cval, freq in lines: score = (float(tfidf) * tfidf_w) + (float(cval) * cval_w) + (float(freq) * lm_w) final_list.append((group, score)) sorted_list = sorted(final_list, key=lambda tup: tup[1], reverse=True) save_scores_list(sorted_list, output_path)
[docs]def save_scores_list(scores, file_path): """ Save the list of topics-scores into a file Args: scores: A list of topics (groups) with scores file_path: The output file path """ logger.info('saving np extraction results to: %s', file_path) with open(file_path, 'wt', encoding='utf-8', newline='') as csv_file: writer = csv.writer(csv_file, delimiter=',') ctr = 0 for topics, imp in scores: try: row = (topics, str(imp)) writer.writerow(row) ctr += 1 except Exception as e: logger.error("Error while writing scores to file. iteration #: %s. Error: %s", str( ctr), str(e))
[docs]def merge_phrases(data, is_ref_data, hash2group, rep2rank, top_n, topics_count): """ Analyze the provided topics data and detect trends (changes in importance) Args: data: A list of topics with importance scores is_ref_data (Boolean): Was the data extracted from the target/reference corpus hash2group: A dictionary storing the data of each topic rep2rank: A dict of all groups representatives and their ranks top_n (int): Limit the analysis to only the top N phrases of each list topics_count (int): The total sum of all topics extracted from both corpora """ logger.info('merge and compare groups for data: %s', str(data)) ctr = 0 if not Path(data).exists(): logger.error('invalid csv file: %s', str(data)) sys.exit() try: with open(data, encoding='utf-8', errors='ignore') as csv_file: topics = csv.reader(csv_file, delimiter=',') for group, imp in topics: if ctr == top_n: break try: rep = clean_group(group).strip() imp = float(imp) * 100.0 rank = ctr + 1 hash_id = simple_normalizer(rep) if hash_id not in hash2group: rep2rank[rep] = rank if is_ref_data: hash2group[hash_id] = (rep, imp, 0, rank) else: hash2group[hash_id] = (rep, 0, 1, imp, rank) elif not is_ref_data: data_b = hash2group[hash_id] if data_b[2] == 0: # create a trend only in comparison to the # ref topics, ignore cases of different topics have the # same hash or same topic was extracted twice from the # same data old_rep = data_b[0] old_rank = data_b[3] rep2rank[old_rep] = int((rank + old_rank) / 2) # rank # of topic that appear in both corpora calculated as # the avarage of ranks change = float(imp) - float(data_b[1]) t_score = (topics_count - (old_rank + rank)) * abs(change) hash2group[hash_id] = ( old_rep, float(data_b[1]), 2, imp, change, abs(change), t_score) # trend phrase ctr += 1 except Exception as e: logger.error('bad line: %s. Error: %s', str(ctr), str(e)) except Exception as e: logger.error('Error: %s. Is %s a valid csv file?', str(e), str(data)) sys.exit()
[docs]def clean_group(phrase_group): """ Returns the shortest element in a group of phrases Args: phrase_group (String): a group of phrases separated by ';' Returns: The shortest phrase in the group (String) """ text = [x.lstrip() for x in phrase_group.split(';')] return min(text, key=len)
if __name__ == '__main__': parser = argparse.ArgumentParser(prog='trend_analysis.py') parser.add_argument('target_topics', metavar='target_topics', type=validate_existing_filepath, help='a path to a csv topic-list extracted from the ' 'target corpus') parser.add_argument('ref_topics', metavar='ref_topics', type=validate_existing_filepath, help='a path to a csv topic-list extracted from the ' 'reference corpus') parser.add_argument('--top_n', type=int, action=check_size(0, 100000), default=10000, help='compare only top N topics (default: 10000)') parser.add_argument('--top_vectors', type=int, action=check_size(0, 100000), default=500, help='include only top N vectors in the scatter graph (default: 500)') args = parser.parse_args() analyze(args.target_topics, args.ref_topics, args.target_topics, args.ref_topics, args.top_n, args.top_vectors)